package cn.azzhu.day07

import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**
 * 使用flink-cep实现订单超时检测
 * @author azzhu
 * @create 2020-09-22 22:06:46
 */
object OrderTimeoutDetect {
  case class OrderEvent(orderId:String,eventType:String,eventTime:Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.fromElements(
      OrderEvent("order_1", "create", 2000L),
      OrderEvent("order_2", "create", 3000L),
      OrderEvent("order_2", "pay", 4000L)
    )
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.orderId)

    //定义规则
    val pattern = Pattern
      .begin[OrderEvent]("create")
      .where(_.eventType.equals("create"))
      .next("pay")
      .where(_.eventType.equals("pay"))
      .within(Time.seconds(5))

    val patternStream = CEP.pattern(stream, pattern)

    //用来输出超时订单的信息
    //超时订单的意思是只有create事件，没有pay事件
    val orderTimeoutOutputTag = new OutputTag[String]("timeout")

    //这个匿名函数用来处理超时的检测
    val timeoutFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],
                       ts:Long,out:Collector[String]) => {
      println("在 " + ts + "ms之前没有支付！超时了！")
      val orderCrate = map("create").iterator.next()
      out.collect("超时订单的ID为:" + orderCrate.orderId)
    }

    //这个匿名函数用来处理支付成功的检测
    val selectFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],
                      out:Collector[String]) => {
      val orderPay = map("pay").iterator.next()
      out.collect("订单ID为：" + orderPay.orderId + "支付成功 ")
    }

    val detectStream = patternStream
      //flatSelect 方法接收 科里化参数
      //一参：检测出的超时信息发送到的侧输出标签
      //2参：用来处理超时信息的函数
      //3参：用来处理create和pay匹配成功的信息
      .flatSelect(orderTimeoutOutputTag)(timeoutFunc)(selectFunc)

    //打印匹配成功的信息
    detectStream.print()

    //打印超时信息
    detectStream.getSideOutput(orderTimeoutOutputTag).print()

    env.execute("OrderTimeoutDetect")
  }
}
